Skip to content

Replace BulkProcessor with custom RoutedBulkIndexer#1548

Merged
oblodgett merged 7 commits intostagefrom
custom-bulk-processor
Apr 2, 2026
Merged

Replace BulkProcessor with custom RoutedBulkIndexer#1548
oblodgett merged 7 commits intostagefrom
custom-bulk-processor

Conversation

@oblodgett
Copy link
Copy Markdown
Member

Summary

  • Replace ES BulkProcessor with custom RoutedBulkIndexer supporting shard routing, retry with backoff, and per-indexer stats
  • Switch from JSON to Smile binary format for bulk indexing (faster serialization, smaller payloads)
  • Consolidate 8 size-bucketed jsonQueues into a single shared queue
  • Spin up shardCount * 4 indexer threads for maximum ES throughput
  • Proper 429/failure retry instead of silent data loss; System.exit(-1) after max retries
  • ~460 lines removed, ~274 added

Test plan

  • Tested locally with indexing=false -- full pipeline runs correctly
  • Tested locally with indexing=true against local ES -- balanced BP distribution, ~17-20K r/s combined
  • Test on stage server with 16 shards, 24GB heap, NVMe

- New RoutedBulkIndexer class with shard routing, retry with backoff,
  and per-indexer stats (docStats, queueStats, esBatchRequestStats)
- Removes 8 BulkProcessors, 8 builders, 8 clients, VCFJsonBulkIndexer
- Each batch routed to a random shard for sequential writes
- Proper 429/failure retry instead of silent data loss
- System.exit(-1) after max retries exhausted to ensure data integrity
- Indexing flag read from config; pipeline runs in both modes
- ~300 lines of boilerplate removed from SourceDocumentCreation
@oblodgett oblodgett requested a review from a team as a code owner April 2, 2026 19:55
@claude
Copy link
Copy Markdown

claude bot commented Apr 2, 2026

Code Review

Summary: This PR replaces 8 hardcoded BulkProcessor/queue/client instances with a single shared LinkedBlockingDeque<List<byte[]>> drained by N RoutedBulkIndexer threads, and switches serialization from JSON to Jackson Smile binary format. This is a significant simplification — ~400 lines of copy-paste boilerplate removed.

The changes look correct. A few observations:

Shutdown sequence is sound

The RoutedBulkIndexer correctly handles the interrupt-based shutdown: after exiting the main loop it clears the interrupt flag via Thread.interrupted() and flushes any pendingDocs that were polled but not yet submitted. The caller in SourceDocumentCreation waits for the queue to drain, then interrupts and joins — this cooperates properly with the indexer's flush logic.

Minor observations (non-blocking)

  1. Commented-out code in JSONProducer — lines like //jsonDoc = sequenceWriter.writeValueAsString(ssd); and the large block of debug logging comments (~10 lines) could be cleaned up since they reference the old JSON path that no longer exists.

  2. System.exit(-1) on exhausted retries (RoutedBulkIndexer:264) — this is carried over from the old BulkProcessor listener pattern, so not a regression, but worth noting it will kill the entire JVM without cleanup of other threads.

  3. maxRetries=100 with exponential backoff — at retry 10+ the sleep is capped at ~17 minutes per attempt (1 << 10 * 1000ms). Retries 11-100 all sleep 17 minutes each, so worst case is ~25 hours of retrying before giving up. This is likely intentional for a long-running indexing job, just calling it out.

No correctness issues, no security concerns, no breaking API changes. LGTM.

🤖 Generated with Claude Code

Remove unnecessary parens in RoutedBulkIndexer, set non-MGD species
inactive in VariantFileSet.yaml for MGD-only run, and increase local
ES max_content_length to 1g.
@oblodgett oblodgett enabled auto-merge April 2, 2026 20:16
@oblodgett oblodgett merged commit 5a9295e into stage Apr 2, 2026
5 checks passed
@oblodgett oblodgett deleted the custom-bulk-processor branch April 2, 2026 20:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants